-
Notifications
You must be signed in to change notification settings - Fork 1
feature: Implement saving operations to / reading from the on-disk storage #149
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
92ee4aa
to
7defd81
Compare
yield operation, create_run.sequence_id, create_run.timestamp | ||
|
||
if not operations: | ||
yield None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yield is not like return, 305-307 will still be executed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed. I've added tests for stram_attributes.
src/neptune_scale/api/attribute.py
Outdated
@property | ||
def last_timestamp(self) -> Optional[float]: | ||
with self._lock: | ||
return self._last_timestamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should keeping last_sequence_id and last_timestamp be responsibility of AttributeStore?
You are passing AttributeStore to LagTracker just to read this timestamp, even though it's not interested in attribute values.
And you are manually updating the timestamp after creating a Run even though it has nothing to do with attribute values
It seem to belong more to the OperationsRepository itself or maybe to some class in the middle
.. <-> OperationsService <-> OperationsRepository
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my opinion, adding a class with the last timestamp and last sequence will be fine. I considered moving the 'create run' functionality to AttributeStore, but it's not a good solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
b1e64bb
to
fca2e4c
Compare
e4a4f03
to
07ff04d
Compare
@@ -57,7 +57,7 @@ def __init__( | |||
metrics: Optional[Metrics], | |||
add_tags: Optional[dict[str, Union[list[str], set[str], tuple[str]]]], | |||
remove_tags: Optional[dict[str, Union[list[str], set[str], tuple[str]]]], | |||
max_message_bytes_size: int = 1024 * 1024, | |||
max_message_bytes_size: int = 8 * 1024 * 1024, # 8 MB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why 8? Don't we have 16 in the API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set to 2MB
@@ -692,3 +711,8 @@ def print_message(msg: str, *args: Any, last_print: Optional[float] = None, verb | |||
return current_time | |||
|
|||
return last_print | |||
|
|||
|
|||
def _create_repository_path(project: str, run_id: str) -> Path: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: ideally we need to come up with something unambiguous, but usable (url encoding is not usable). Need to think some more.
yield operation, create_run.sequence_id, create_run.ts | ||
|
||
if not operations: | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need some backoff here - 0.1s sleep for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use Daemon thread:
class Daemon(threading.Thread): |
With sleep time:
SYNC_THREAD_SLEEP_TIME = 0.5
, if list is empty, loop in work
finish and thread go to sleep.
@@ -464,7 +343,32 @@ def _raise_exception(status_code: int) -> None: | |||
raise NeptuneUnexpectedResponseError() | |||
|
|||
|
|||
class StatusTrackingThread(Daemon, WithResources): | |||
def _merge_operations(operations: list[Operation]) -> tuple[UpdateRunSnapshots, SequenceId, datetime.datetime]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO this should be inlined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
logger.debug("Submitting operation #%d with size of %d bytes", sequence_id, len(data)) | ||
run_operation = RunOperation() | ||
run_operation.ParseFromString(data) | ||
logger.debug("Submitting operation #%d with size of %d bytes", sequence_id, len("")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either remove or fill with something sensible (ByteSize is probably too heavy, but maybe snapshot count / create run count, or something?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add if for debug level:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"Submitting operation #%d with size of %d bytes", sequence_id, run_operation.ByteSize()
)
assert tracker.last_sequence_id == 10 | ||
assert tracker.last_timestamp == 123.456 | ||
|
||
# Update with a lower sequence ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this actually happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
without lock in attribute store: yes.
We can add lock here:
operations = list(splitter)
sequence_id = self._operations_repo.save_update_run_snapshots(operations)
self._sequence_tracker.update_sequence_id(sequence_id)
tests/unit/test_stream_operations.py
Outdated
@@ -0,0 +1,167 @@ | |||
import time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm quite against this entire test. We're testing some internal implementation instead of behavior.
What we should do instead is write a unit test for sync process mocking the repository and the backend client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -242,6 +259,9 @@ def get_metadata(self) -> Optional[Metadata]: | |||
|
|||
version, project, run_id, parent_run_id, fork_step = row | |||
|
|||
if version != DB_VERSION: | |||
raise NeptuneLocalStorageInUnsupportedVersion() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should move the validation to constructor or dedicated method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a comment elsewhere - let's remove it from here.
src/neptune_scale/api/run.py
Outdated
@@ -280,12 +279,15 @@ def _check_for_run_conflicts( | |||
) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this method should become something like _validate_existing_db()
and within it there should be:
- Check if the version matches. If not -> NeptuneLocalStorageInUnsupportedVersion
- Check if all other metadata matches. If not -> NeptuneConflictingDataInLocalStorage; if it does -> warning.
@@ -242,6 +259,9 @@ def get_metadata(self) -> Optional[Metadata]: | |||
|
|||
version, project, run_id, parent_run_id, fork_step = row | |||
|
|||
if version != DB_VERSION: | |||
raise NeptuneLocalStorageInUnsupportedVersion() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a comment elsewhere - let's remove it from here.
* feat: Implement OperationsRepository * feat: Add test * feat: Add test * feat: Review * feat: Add dataclass for metadata * feat: Add dataclass for metadata - precommit * feat: Move init db * fix: Refactor OperationsRepository: replace window function. (#150) * fix: Refactor OperationsRepository with performance improvements: replace window function. * feature: Implement saving operations to / reading from the on-disk storage (#149) feature: Implement saving operations to / reading from the on-disk storage * feat: add sync cli * reuse SyncProces internals in sync cli * remove sync-no-parent flag * implement sync using SyncProcess * Add progress bar * Add tests * change tqdm version range * feat: add offline mode Disable unnecessary components in disabled mode * add tests * Allow providing base directory for log files This can be done using env var `NEPTUNE_LOG_DIRECTORY`, or a new Run constructor argument `log_directory`. * `OperationsRepository` requires an absolute db path * Adjust Run log_directory arg. tests for absolute/relative paths * Apply suggestions from code review Simpler tests and path resolve logic, don't verify log_directory being not empty Co-authored-by: Piotr Łusakowski <piotr.lusakowskI@neptune.ai> * fix errors after merge * add e2e tests * feat: delete sqlite files after exit (#154) * feat: delete sqlite files after exit * Ignore test_cleanup_repository_conflict --------- Co-authored-by: Michał Sośnicki <michal.sosnicki@neptune.ai> * Set NEPTUNE_PROJECT in e2e tests * When SyncProcess crashes the Run remains uninterrupted * Give some time for background processing after killing the sync process in tests * feat: delete operations on ack instead of send (#156) * feat: delete operations on ack instead of send * add tests * add one last test * fix code style --------- Co-authored-by: Michał Sośnicki <michal.sosnicki@neptune.ai> * Add timestamp to operations repository path (#157) * feat: add timestamp to operations repository path * fix: convert arg to Path in click command * remove db validate * Remove fork info from operations db * sleep in tests --------- Co-authored-by: Michał Sośnicki <michal.sosnicki@neptune.ai> --------- Co-authored-by: Patryk Gala <patryk.gala@neptune.ai> Co-authored-by: Michał Sośnicki <michal.sosnicki@neptune.ai> Co-authored-by: Krzysztof Godlewski <krzysztof.godlewski@neptune.ai> Co-authored-by: Piotr Łusakowski <piotr.lusakowskI@neptune.ai>
Before submitting checklist